Go 调用 kafka
这里使用的客户端是 kafka-go
import "github.com/segmentio/kafka-go"
发送消息
使用 kafka.Writer
向 Kafka 服务器发送消息。
创建 Writer
w := kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
WriteTimeout: writeTimeout,
}
其中:
- brokers 是 Kafka 服务地址列表
- topic 是主题
- writeTimeout 是超时时间
发送消息,如果发送失败,err 不为空
err := w.WriteMessages(
context.Background(),
kafka.Message{
Value: message,
},
)
消息发送完毕后,手动关闭 Writer
w.Close()
接收消息
使用 kafka.Reader 从 Kafka 服务器接收消息。
r = kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupId,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
其中 groupId 是 consumer group 名称,每条消息只会向同一个 group 中某个 consumer 发送一次。
因为接收消息的程序要持续运行,所以使用 defer 语句执行关闭操作
defer r.Close()
使用无限 for 循环读取并打印每条消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
log.Printf("%s", string(m.Value))
}
运行流程如下图所示: